#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <time.h>

#include "qbus.h"

struct Args {
    char topic[32];
    char payload[128];
    unsigned int seq;
};

static int s_quit = 0;

//DO NOT call consumer or bus functions!
void consumer_read(const char *top, unsigned int topic_len, const void *pl, unsigned int payload_len, unsigned int seq, void *args)
{
    struct Args *arg = (struct Args*)args;
    memcpy(arg->topic, top, topic_len> sizeof(arg->topic) ? sizeof(arg->topic)-1 : topic_len);
    memcpy(arg->payload, pl, payload_len> sizeof(arg->payload) ? sizeof(arg->payload)-1 : payload_len);
    arg->seq = seq;
}

static void* entry(void *args) {
    struct qconsumer *consumer = (struct qconsumer *)args;
    struct Args params;
    memset(&params, 0, sizeof(params));
    while(!s_quit) {
#if 1
        memset(&params, 0, sizeof(params));
        if(consumer->read_map(consumer, consumer_read, &params) == 0) {
            if(IS_CALL(params.seq)) {
                char tmp[256] = {0};
                snprintf(tmp, sizeof(tmp)-1, "seq %d topic %s, request: %s", params.seq, params.topic, params.payload);
                consumer->send_resp(consumer, params.seq, tmp, strlen(tmp));
            } else {
                printf("[GOT] topic [%s], payload [%s]\n", params.topic, params.payload);
            }
        } else {
            usleep(10*1000);
        }
#else
        if( consumer->read(consumer, params.topic, sizeof(params.topic), params.payload, sizeof(params.payload), NULL, &params.seq) == 0) {
            if(IS_CALL(params.seq)) {
                char tmp[256] = {0};
                snprintf(tmp, sizeof(tmp)-1, "seq %d topic %s, request: %s", params.seq, params.topic, params.payload);
                consumer->send_resp(consumer, params.seq, tmp, strlen(tmp));
            } else {
                printf("[GOT] topic [%s], payload [%s]\n", params.topic, params.payload);
            }
        } else {
            usleep(10*1000);
        }
#endif
    }
}

int msg_filter(const char *topic, unsigned int topic_len,
               const char *msg_topic, unsigned int msg_topic_len, const char *payload, unsigned int payload_len)
{
    printf("%s: topic %s\n", __FUNCTION__, topic);
    return 1;
}

static void event_handler(unsigned int id, const void *payload, unsigned int payload_len, void *args)
{
    struct qdispatcher *disp = ( struct qdispatcher *)args;
    printf("%s: id [%d] len [%d] payload [%s]\n", __FUNCTION__, id, payload_len, (const char*)payload);
    disp->remove_listener(disp, id, event_handler);
    disp->add_listener(disp, id, event_handler, args);
}

int main() {
    struct qbus *bus = create_qbus(10);
//    struct qconsumer *consumer = create_consumer(bus);
    struct qconsumer *consumer = create_consumer_with_filter(bus, msg_filter);
    struct qdispatcher *disp = create_qdispatcher(bus);
    do {
        disp->add_listener(disp, 1000, event_handler, disp);

        consumer->subscribe(consumer, "aa");
        consumer->subscribe(consumer, "bb");
        pthread_t tid;
        pthread_create(&tid, NULL, entry, consumer);
        int i;
        char tmp[128] = {0};
        for(i=0; i< 1000; i++) {
            if(i % 50 == 0) {
                usleep(10*1000);
            }
            snprintf(tmp, sizeof(tmp)-1, "[%d] %d: aa", (int)time(NULL), i);
            disp->broadcast(disp, 1000, tmp, strlen(tmp));
            bus->publish_ext(bus, "aa", "hhh_", 4, tmp, strlen(tmp));
//            publish_qmessage(bus, "aa", NULL, 0);
            snprintf(tmp, sizeof(tmp)-1, "bb %d", i);
            char tmp1[128] = {0};
            int ret = bus->call(bus, "bb", tmp, strlen(tmp), tmp1, sizeof(tmp1)-1, NULL, 30);
            if(ret != 0) {
                printf("call failed: %d\n", ret);
                continue;
            }
            printf("call resp => %s\n", tmp1);
        }
        for(int i=0; i<3; i++){
            usleep(1*1000*1000);
        }
        s_quit = 1;
        usleep(3 *1000*1000);
        bus->publish(bus, "aa", tmp, strlen(tmp));
    } while(0);
    if(consumer) {
        consumer->free(consumer);
    }
    if(disp) {
        disp->free(disp);
    }
    usleep(500*1000);
    if(bus) {
        bus->free(bus);
    }
    return 0;
}